Lambda 関数を利用して、Linux OS で起動している暗号化されていない "ルートボリューム" を暗号化されたルートボリュームに自動で置き換えてみた

Lambda 関数を利用して、Linux OS で起動している暗号化されていない "ルートボリューム" を暗号化されたルートボリュームに自動で置き換えてみた

Clock Icon2024.10.04

はじめに

テクニカルサポートの 片方 です。
Lambda 関数を利用して暗号化されていないルートボリュームを、暗号化されたルートボリュームに自動で置き換えてみました。
Windows OS は対象外で、Linux OS のみを対象としてます。

https://repost.aws/ja/knowledge-center/ebs-change-encryption-key

実装してみた

以下の順番で実装します。

  • 実行ロール作成
  • Lambda 関数作成

元になる暗号化されていないルートボリュームの設定値やタイプはそのままに、暗号化して置き換えます。

実行ロール

※ 信頼関係

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "lambda.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

アタッチするポリシー例

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
               "ec2:StopInstances",
                "ec2:StartInstances",
                "ec2:DescribeInstances",
                "ec2:DescribeSnapshots",
                "ec2:DescribeVolumes",
                "ec2:CreateSnapshot",
                "ec2:CreateVolume",
                "ec2:AttachVolume",
                "ec2:DetachVolume",
                "ec2:DeleteVolume"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:<region>:<account-id>:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:<region>:<account-id>:log-group:/aws/lambda/<function-name>:*"
            ]
        }
    ]
}

※ 適宜修正してください。

Lambda 関数

Python 3.12 で作成しました。
実行ロールでは、既存のロールを使用するを選択し、先ほど作成したロールを指定します。

実装する Lambda 関数例
import boto3
import time

ec2_client = boto3.client('ec2')

def lambda_handler(event, context):
    # 暗号化されていないルートボリュームを検索
    unencrypted_volumes = find_unencrypted_root_volumes()

    if not unencrypted_volumes:
        print("No unencrypted root volumes found.")
        return

    for volume in unencrypted_volumes:
        volume_id = volume['VolumeId']
        print(f"Unencrypted root volume {volume_id} found.")

        # アタッチされているインスタンスIDを取得
        instance_id = get_attached_instance(volume)

        if not instance_id:
            print(f"Volume {volume_id} is not attached to any instance.")
            continue

        # インスタンスのOSがLinuxであることを確認
        if not is_linux_instance(instance_id):
            print(f"Instance {instance_id} is not Linux. Skipping.")
            continue

        print(f"Volume {volume_id} is attached to Linux instance {instance_id}. Proceeding with encryption...")

        # インスタンスを停止
        stop_instance(instance_id)

        # AvailabilityZoneとボリュームタイプを取得
        availability_zone = volume['AvailabilityZone']
        volume_type = volume['VolumeType']
        iops = volume.get('Iops')  # IOPSを取得 (io1, io2の場合)

        # スナップショット作成
        snapshot_id = create_snapshot(volume_id)

        # 暗号化された新しいボリュームを作成
        encrypted_volume_id = create_encrypted_volume(snapshot_id, availability_zone, volume_type, iops)

        # 既存のルートボリュームをデタッチ
        detach_volume(instance_id, volume_id)

        # 新しい暗号化されたボリュームをルートボリュームとしてアタッチ
        attach_volume(instance_id, encrypted_volume_id, volume['Attachments'][0]['Device'])

        # 古い未暗号化ルートボリュームを削除
        delete_volume(volume_id)

        # インスタンスを再起動
        start_instance(instance_id)

def find_unencrypted_root_volumes():
    # 暗号化されていないルートボリュームを取得
    response = ec2_client.describe_volumes(Filters=[{'Name': 'encrypted', 'Values': ['false']}])
    unencrypted_volumes = []

    for volume in response['Volumes']:
        instance_id = get_attached_instance(volume)
        if instance_id and is_root_volume(volume, instance_id):
            unencrypted_volumes.append(volume)

    return unencrypted_volumes

def get_attached_instance(volume):
    # アタッチされているインスタンスIDを取得
    if 'Attachments' in volume and volume['Attachments']:
        return volume['Attachments'][0]['InstanceId']
    return None

def is_root_volume(volume, instance_id):
    # ルートデバイスかどうかを確認
    instance_info = ec2_client.describe_instances(InstanceIds=[instance_id])
    root_device_name = instance_info['Reservations'][0]['Instances'][0]['RootDeviceName']
    return volume['Attachments'][0]['Device'] == root_device_name

def is_linux_instance(instance_id):
    # インスタンスがLinuxかどうかを確認
    instance_info = ec2_client.describe_instances(InstanceIds=[instance_id])
    platform_details = instance_info['Reservations'][0]['Instances'][0].get('PlatformDetails', '')
    # Linuxであることを確認
    return 'Linux' in platform_details

def create_snapshot(volume_id):
    # スナップショット作成
    response = ec2_client.create_snapshot(VolumeId=volume_id, Description=f"Snapshot of {volume_id}")
    snapshot_id = response['SnapshotId']
    print(f"Snapshot {snapshot_id} created for volume {volume_id}")
    wait_for_snapshot(snapshot_id)
    return snapshot_id

def create_encrypted_volume(snapshot_id, availability_zone, volume_type, iops=None):
    # 暗号化されたボリュームを作成
    create_volume_params = {
        'SnapshotId': snapshot_id,
        'AvailabilityZone': availability_zone,
        'VolumeType': volume_type,
        'Encrypted': True
    }

    if volume_type in ['io1', 'io2'] and iops:
        create_volume_params['Iops'] = iops

    response = ec2_client.create_volume(**create_volume_params)
    encrypted_volume_id = response['VolumeId']
    print(f"Encrypted volume {encrypted_volume_id} created from snapshot {snapshot_id} with type {volume_type}")
    wait_for_volume_state(encrypted_volume_id, 'available')
    return encrypted_volume_id

def detach_volume(instance_id, volume_id):
    # ボリュームをインスタンスからデタッチ
    ec2_client.detach_volume(VolumeId=volume_id, InstanceId=instance_id, Force=True)
    print(f"Detaching volume {volume_id} from instance {instance_id}...")
    wait_for_volume_state(volume_id, 'available')

def attach_volume(instance_id, volume_id, device_name):
    # 新しいボリュームをインスタンスにアタッチ
    ec2_client.attach_volume(VolumeId=volume_id, InstanceId=instance_id, Device=device_name)
    print(f"Attaching volume {volume_id} to instance {instance_id} as {device_name}...")
    check_volume_attached(volume_id)

def check_volume_attached(volume_id):
    # ボリュームがアタッチされるのを待つ
    for attempt in range(10):
        volume = ec2_client.describe_volumes(VolumeIds=[volume_id])['Volumes'][0]
        if volume['State'] == 'in-use':
            print(f"Volume {volume_id} is now in-use state.")
            return
        time.sleep(5)
    raise Exception(f"Volume {volume_id} failed to reach 'in-use' state.")

def delete_volume(volume_id):
    # 古いボリュームを削除
    ec2_client.delete_volume(VolumeId=volume_id)
    print(f"Deleted volume {volume_id}")

def wait_for_volume_state(volume_id, state):
    # ボリュームのステータスを待機
    waiter = ec2_client.get_waiter(f'volume_{state}')
    try:
        waiter.wait(
            VolumeIds=[volume_id],
            WaiterConfig={
                'Delay': 15,    # 待機時間を15秒に設定
                'MaxAttempts': 40  # 最大試行回数を40回に設定(必要に応じて調整)
            }
        )
    except Exception as e:
        print(f"Error waiting for volume {volume_id} to be in {state} state: {str(e)}")
        raise
    print(f"Volume {volume_id} is now in {state} state.")

def wait_for_snapshot(snapshot_id):
    # スナップショットの完了を待機
    waiter = ec2_client.get_waiter('snapshot_completed')
    waiter.wait(SnapshotIds=[snapshot_id])
    print(f"Snapshot {snapshot_id} is now completed.")

def stop_instance(instance_id):
    # インスタンスを停止
    ec2_client.stop_instances(InstanceIds=[instance_id])
    print(f"Stopping instance {instance_id}...")
    waiter = ec2_client.get_waiter('instance_stopped')
    waiter.wait(InstanceIds=[instance_id])
    print(f"Instance {instance_id} is stopped.")

def start_instance(instance_id):
    # インスタンスを開始
    ec2_client.start_instances(InstanceIds=[instance_id])
    print(f"Starting instance {instance_id}...")
    waiter = ec2_client.get_waiter('instance_running')
    waiter.wait(InstanceIds=[instance_id])
    print(f"Instance {instance_id} is running.")

※ 適宜修正してください。
なお、暗号化に使用されるキーは、デフォルトで使用される aws/ebs この KMS キーを暗号化に使用します。

https://docs.aws.amazon.com/ja_jp/ebs/latest/userguide/work-with-ebs-encr.html

検証してみた

暗号化されていないルートボリュームを使用して Linux OS のインスタンスを起動します。
検証のため以下の OS と異なるボリュームタイプをアタッチさせて起動しました。

  • Amazon Linux 2023 : io2
  • Amazon Linux 2 : io1
  • Red Hat Enterprise Linux 9.4 : gp3
  • Ubuntu 24.04 : gp2
  • CentOS Linux 7.9.2009 : standard

無題
無題1
無題2

Lambda 関数をテストします。

無題3

無題4

暫くすると...全て成功しました。

無題5

EBS のマネジメントコンソール画面を確認します。暗号化されていました!

無題6

無題10

暗号化実施前のスナップショットも取得されています。

無題7

EC2 側でもステータスチェックが成功しており問題なく起動しています。

無題8

まとめ

本ブログが誰かの参考になれば幸いです。

参考資料

アノテーション株式会社について

アノテーション株式会社は、クラスメソッド社のグループ企業として「オペレーション・エクセレンス」を担える企業を目指してチャレンジを続けています。「らしく働く、らしく生きる」のスローガンを掲げ、様々な背景をもつ多様なメンバーが自由度の高い働き方を通してお客様へサービスを提供し続けてきました。現在当社では一緒に会社を盛り上げていただけるメンバーを募集中です。少しでもご興味あれば、アノテーション株式会社WEBサイトをご覧ください。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.